Add auto-chunking and concurrent dispatch for bulk record operations#162
Add auto-chunking and concurrent dispatch for bulk record operations#162abelmilash-msft wants to merge 25 commits into
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…S to 3 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…3, fix minor docstrings Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove examples/advanced/contextvar_thread_demo.py (internal debugging artifact, not intended for the public repo). Trim redundant clause from the large-batch README tip. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace silent capping in _dispatch_chunks with an explicit UserWarning so callers are informed when their max_workers value is reduced. Revert _MAX_WORKERS to 3. Remove two internal implementation comments from _upsert_multiple. Update tests to assert the warning is emitted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace 'silently capped' with 'capped to _MAX_WORKERS' throughout _odata.py to be consistent with the UserWarning now emitted on cap. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds built-in chunking (1,000-record batches) and optional concurrent dispatch for bulk record operations, including retry-with-jitter on transient errors and thread ContextVar propagation.
Changes:
- Introduces
_dispatch_chunksto run chunked bulk requests sequentially or viaThreadPoolExecutor, with per-chunk transient retry logic and ContextVar propagation. - Extends
records.create/update/upsertanddataframe.create/updateAPIs withmax_workersforwarding (default1) and updates docs/examples accordingly. - Adds/updates unit tests for chunking boundaries, concurrency ordering, retries, warnings/capping, and picklist cache cold-start locking.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_records_operations.py | Updates mocks/assertions to include max_workers forwarding. |
| tests/unit/test_dataframe_operations.py | Adds max_workers forwarding assertions and new forwarding-focused tests. |
| tests/unit/test_client.py | Updates client-level tests for new max_workers argument propagation. |
| tests/unit/data/test_multiple_chunking.py | Adds comprehensive chunking/concurrency/retry/cache-lock tests (new file). |
| src/PowerPlatform/Dataverse/operations/records.py | Adds max_workers kwarg to public record APIs + validation + docs. |
| src/PowerPlatform/Dataverse/operations/dataframe.py | Forwards max_workers into record operations + updates docs. |
| src/PowerPlatform/Dataverse/data/_odata.py | Implements chunking, concurrency, retry-with-jitter, ContextVar propagation, picklist cache locking. |
| src/PowerPlatform/Dataverse/claude_skill/dataverse-sdk-use/SKILL.md | Documents auto-chunking and max_workers behavior. |
| .claude/skills/dataverse-sdk-use/SKILL.md | Mirrors skill doc update for auto-chunking and concurrency. |
| examples/advanced/walkthrough.py | Adds walkthrough section demonstrating auto-chunking; renumbers later sections. |
| README.md | Documents max_workers and large-batch auto-chunking semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… validation - Fix .. note:: in records.create/update/upsert to mention concurrent dispatch when max_workers > 1, not just sequential chunking (comment #1) - Import _MAX_WORKERS in records.py and build the ValueError message from it so the text stays accurate if the cap changes (comment #2) - Add explicit ValueError in _dispatch_chunks for non-int or < 1 max_workers, with three new tests covering zero, negative, and non-int inputs (comments #4/#5) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
| warnings.warn( | ||
| f"max_workers={max_workers} exceeds the maximum of {_MAX_WORKERS}; capping to {_MAX_WORKERS}.", | ||
| UserWarning, | ||
| stacklevel=2, |
There was a problem hiding this comment.
should it be stacklevel=4 for warning?\
Also either move the warning after the single-chunk check, or add 'no concurrency will be used' to the warning message when len(chunks) == 1
There was a problem hiding this comment.
Updated this on the latest commit (5b2f6e9)
…evel=4 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
5747ef0 to
d867715
Compare
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
d867715 to
9a1f490
Compare
| Lists exceeding 1,000 records are automatically split into chunks | ||
| of up to 1,000 records; dispatched sequentially by default, or | ||
| concurrently when ``max_workers > 1`` (capped to ``_MAX_WORKERS``). | ||
| This is **not atomic** — a failure mid-way may leave earlier chunks |
There was a problem hiding this comment.
sorry I missed this in the first pass.
With chunking, the user's atomic request is effectively converted into multiple non-atomic operations, which changes the original behavior. We should make this overload. that the default behavior is as-is.
I'm proposing
From:
ids = client.records.create("account", payloads, maxium_worker=1)
To:
ids = client.records.create("account", payloads) // default: non_atomic_chunking=False >> sends all
ids = client.records.create("account", payloads, non_atomic_chunking=True) >> sends in chunks, the parameter is clear that chunking is not atomic
Also hide number of workers, which is internal can be optimized by SDK client which out input from caller.
Summary
Add chunking with concurrency to address #156
records.create,records.update, andrecords.upsertnow automatically split inputs exceeding 1,000 records into 1,000-record chunks — no caller intervention required.max_workersparameter (default1, max3) enables concurrent chunk dispatch viaThreadPoolExecutor. Values above the cap emit aUserWarningrather than raising.prefetch_pagesparameter (default 0, sequential) to _get_multiple: when set to 1, the next-page HTTP request is submitted immediately after receiving the current page — before yielding to the caller — so network I/O overlaps with per-page processing (e.g. transforms, DB writes); values above 1 are capped at 1.Retry-Afterduration plus random jitter to desynchronise concurrent workers.ContextVarvalues (e.g. correlation IDs) are propagated to worker threads viacopy_context()— each future gets its own snapshot to avoid cross-thread context corruption.dataframe.createanddataframe.updateforwardmax_workersthrough to the record-level operations.Load test results
Tested against a live Dataverse environment (25,000 records).
Concurrency comparison — 3,000 records (3 chunks of 1,000)
High-volume load — 25,000 records (25 chunks of 1,000, max_workers=15)
Atomicity note
Chunked operations are not atomic. If a chunk fails mid-way, earlier chunks are already committed. Callers that require atomicity should limit input to ≤ 1,000 records per call.
Unit tests
test_multiple_chunking.pyis a new test file introduced by this PR — it did not exist inmain.Coverage on changed public modules (full suite):
operations/records.pyoperations/dataframe.pyTests cover: auto-chunking boundaries (999/1000/1001 records), sequential and concurrent dispatch, chunk ordering, transient retry with jitter,
ContextVarpropagation to worker threads,max_workerscap warning, and exceptionpropagation.